做网站公司汉狮团队帝国cms的手机网站

当前位置: 首页 > news >正文

做网站公司汉狮团队,帝国cms的手机网站,工商工事上哪个网站做,石家庄网站制作费用1、背景 最近在调研seatunnel的时候#xff0c;发现新版的seatunnel提供了一个web服务#xff0c;可以用于图形化的创建数据同步任务#xff0c;然后管理任务。这里面有个日志模块#xff0c;可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据…1、背景 最近在调研seatunnel的时候发现新版的seatunnel提供了一个web服务可以用于图形化的创建数据同步任务然后管理任务。这里面有个日志模块可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据是怎么来的。跟踪源码发现Hazelcast。所以对Hazelcast进行了研究。 2、Hazelcast是什么 Hazelcast是一个开源的分布式内存数据网格In-Memory Data Grid简称IMDG解决方案主要用于分布式计算和缓存 分布式数据结构Hazelcast提供了一系列分布式数据结构如Map、List、Set、Queue等可以在集群中进行分布式存储和访问。缓存Hazelcast提供了分布式缓存功能可以将数据存储在内存中以提供快速的访问速度。它支持多种缓存策略如LRULeast Recently Used、LFULeast Frequently Used和TTLTime to Live等。分布式计算Hazelcast支持将计算任务分布到集群中的多个节点上进行并行处理提高应用程序的处理能力。高可靠性Hazelcast使用分布式复制和故障转移机制确保数据的可靠性和高可用性。它具有自动故障检测和恢复机制可以在节点故障时自动迁移数据和任务。扩展性Hazelcast可以方便地进行水平扩展通过添加更多的节点来增加集群的处理能力。它支持动态添加和移除节点而无需停止应用程序。集成性Hazelcast提供了与各种应用程序和框架的集成如Spring、Hibernate、JCache等。它还支持与其他分布式系统的集成如Apache Kafka、Apache Ignite等。多语言支持Hazelcast提供了对多种编程语言的支持包括Java、C#、C、Python和Node.js等 3、应用场景 缓存Hazelcast可以作为高性能的分布式缓存解决方案用于缓存应用程序中的热点数据。分布式计算Hazelcast提供了分布式计算框架可以将计算任务分布到集群中的多个节点上进行并行处理适用于金融、电信、电子商务等行业。实时数据处理Hazelcast可以处理实时数据流支持数据的实时处理和分析适用于构建实时应用如实时监控系统、实时推荐系统等。分布式会话管理Hazelcast可以用于管理分布式会话实现会话的共享和负载均衡。分布式数据存储Hazelcast可以作为分布式数据存储解决方案用于在多个节点间共享数据。 4、与Redis对比 可以看到Hazelcast可以理解为一个NoSQL那就不得不说我们用的最多的Redis了。两者都提供了丰富的数据接口比如map、list等等。那为什么不直接用Redis呢。我理解有下边几个方面的原因 使用Redis需要额外的环境搭建而Hazelcast如果使用内嵌的方式则不需要额外的组件引入做到了开箱即用。Hazelcast用的是应用服务器自身的内存扩展性强不需要外部内存有点类似Caffeine。Hazelcast对过期时间的支持没有Redis那么灵活。Hazelcast可以进行分布式计算。我们将数据存入到多个节点通过分布式计算的api从多个节点上读取数据然后计算并返回。这也算是相较Redis的一个优势。Redis可以供多个应用使用共享数据与应用解耦。Hazelcast一般使用需要嵌入应用。 如果不考虑分布式计算等场景完全可以看那个方便。如果公司没有基础架构并且是自己业务线的产品。那完全可以使用Hazelcast。免去了Redis的搭建、运维、管理等环境。否则还是老老实实的用Redis吧。 但是如果存在实时流式处理那么使用Hazelcast的分布式特性是个不错的选择。比如咱们做一个监控系统需要处理很多业务系统的数据总不能单纯在Redis或者Mysql或者单机内存中处理吧。可以考虑试试Hazelcast。 5、怎么用 上边说了一堆的理论说到底怎么用呢这里以SpringBoot嵌入式为例。 maven中添加依赖 dependency groupIdcom.hazelcast/groupId artifactIdhazelcast/artifactId version你的Hazelcast版本号/version
/dependency !– Hazelcast Spring Boot 集成如果需要 –
dependency groupIdcom.hazelcast/groupId artifactIdhazelcast-spring-boot/artifactId version你的Hazelcast Spring Boot集成版本号/version
/dependency 代码 import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; Component
public class HazelcastService { Autowired private HazelcastInstance hazelcastInstance; public void putData() { IMapString, String map hazelcastInstance.getMap(my-map); map.put(key1, value1); } public String getData(String key) { IMapString, String map hazelcastInstance.getMap(my-map); return map.get(key); }
} 启动成功 分别启动两个服务可以看到有两个Hazelcast节点组成的集群 6、源码 源码我想从两个方面去看 1、seatunnel-web提供的查看监控 找到查看日志接口 RequestMapping(/seatunnel/api/v1/task) RestController public class TaskInstanceController {Autowired ITaskInstanceServiceSeaTunnelJobInstanceDto taskInstanceService;GetMapping(/jobMetrics)ApiOperation(value get the jobMetrics list , httpMethod GET)public ResultPageInfoSeaTunnelJobInstanceDto getTaskInstanceList(RequestAttribute(name userId) Integer userId,RequestParam(name jobDefineName, required false) String jobDefineName,RequestParam(name executorName, required false) String executorName,RequestParam(name stateType, required false) String stateType,RequestParam(name startDate, required false) String startTime,RequestParam(name endDate, required false) String endTime,RequestParam(syncTaskType) String syncTaskType,RequestParam(pageNo) Integer pageNo,RequestParam(pageSize) Integer pageSize) {return taskInstanceService.getSyncTaskInstancePaging(userId,jobDefineName,executorName,stateType,startTime,endTime,syncTaskType,pageNo,pageSize);} } 进入getSyncTaskInstancePaging方法 public ResultPageInfoSeaTunnelJobInstanceDto getSyncTaskInstancePaging(Integer userId,String jobDefineName,String executorName,String stateType,String startTime,String endTime,String syncTaskType,Integer pageNo,Integer pageSize) {JobDefinition jobDefinition null;IPageSeaTunnelJobInstanceDto jobInstanceIPage;if (jobDefineName ! null) {jobDefinition jobDefinitionDao.getJobByName(jobDefineName);}ResultPageInfoSeaTunnelJobInstanceDto result new Result();PageInfoSeaTunnelJobInstanceDto pageInfo new PageInfo(pageNo, pageSize);result.setData(pageInfo);baseService.putMsg(result, Status.SUCCESS);Date startDate dateConverter(startTime);Date endDate dateConverter(endTime);if (jobDefinition ! null) {jobInstanceIPage jobInstanceDao.queryJobInstanceListPaging(new Page(pageNo, pageSize),startDate,endDate,jobDefinition.getId(),syncTaskType);} else {jobInstanceIPage jobInstanceDao.queryJobInstanceListPaging(new Page(pageNo, pageSize), startDate, endDate, null, syncTaskType);}ListSeaTunnelJobInstanceDto records jobInstanceIPage.getRecords();if (CollectionUtils.isEmpty(records)) {return result;}addJobDefineNameToResult(records);addRunningTimeToResult(records);// 关键代码上边都是从本地数据库中获取的这里会去Hazelcast中获取数据并更新本地数据jobPipelineSummaryMetrics(records, syncTaskType, userId);pageInfo.setTotal((int) jobInstanceIPage.getTotal());pageInfo.setTotalList(records);result.setData(pageInfo);return result;} 进入代码jobPipelineSummaryMetrics(records, syncTaskType, userId);   private void jobPipelineSummaryMetrics(ListSeaTunnelJobInstanceDto records, String syncTaskType, Integer userId) {try {ArrayListLong jobInstanceIdList new ArrayList();HashMapLong, Long jobInstanceIdAndJobEngineIdMap new HashMap();for (SeaTunnelJobInstanceDto jobInstance : records) {if (jobInstance.getId() ! null jobInstance.getJobEngineId() ! null) {jobInstanceIdList.add(jobInstance.getId());jobInstanceIdAndJobEngineIdMap.put(jobInstance.getId(), Long.valueOf(jobInstance.getJobEngineId()));}}MapLong, JobSummaryMetricsRes jobSummaryMetrics // 获取每条日志数据的监控数据jobMetricsService.getALLJobSummaryMetrics(userId,jobInstanceIdAndJobEngineIdMap,jobInstanceIdList,syncTaskType);for (SeaTunnelJobInstanceDto taskInstance : records) {if (jobSummaryMetrics.get(taskInstance.getId()) ! null) {taskInstance.setWriteRowCount(jobSummaryMetrics.get(taskInstance.getId()).getWriteRowCount());taskInstance.setReadRowCount(jobSummaryMetrics.get(taskInstance.getId()).getReadRowCount());}}} catch (Exception e) {for (SeaTunnelJobInstanceDto taskInstance : records) {log.error(instance {} {} set instance and engine id error, taskInstance.getId(), e);}}} 进入jobMetricsService.getALLJobSummaryMetrics( userId,jobInstanceIdAndJobEngineIdMap, jobInstanceIdList, syncTaskType);   Overridepublic MapLong, JobSummaryMetricsRes getALLJobSummaryMetrics(NonNull Integer userId,NonNull MapLong, Long jobInstanceIdAndJobEngineIdMap,NonNull ListLong jobInstanceIdList,NonNull String syncTaskType) {log.info(jobInstanceIdAndJobEngineIdMap{}, jobInstanceIdAndJobEngineIdMap);funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, userId);ListJobInstance allJobInstance jobInstanceDao.getAllJobInstance(jobInstanceIdList);if (allJobInstance.isEmpty()) {log.warn(getALLJobSummaryMetrics : allJobInstance is empty, task id list is {},jobInstanceIdList);return new HashMap();}MapLong, JobSummaryMetricsRes result null;MapLong, HashMapInteger, JobMetrics allRunningJobMetricsFromEngine // 从Hazelcast集群节点中获取监控数据getAllRunningJobMetricsFromEngine(allJobInstance.get(0).getEngineName(),allJobInstance.get(0).getEngineVersion());// 通过不同的方式获取数据if (syncTaskType.equals(BATCH)) {result getMatricsListIfTaskTypeIsBatch(allJobInstance,userId,allRunningJobMetricsFromEngine,jobInstanceIdAndJobEngineIdMap);} else if (syncTaskType.equals(STREAMING)) {result getMatricsListIfTaskTypeIsStreaming(allJobInstance,userId,allRunningJobMetricsFromEngine,jobInstanceIdAndJobEngineIdMap);}log.info(result is {}, result null ? null : result.toString());return result;} 进入方法getAllRunningJobMetricsFromEngine(allJobInstance.get(0).getEngineName(),allJobInstance.get(0).getEngineVersion());   private MapLong, HashMapInteger, JobMetrics getAllRunningJobMetricsFromEngine(String engineName, String engineVersion) {Engine engine new Engine(engineName, engineVersion);IEngineMetricsExtractor engineMetricsExtractor (new EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor();// 看名字就知道这个是获取任务的监控数据的return engineMetricsExtractor.getAllRunningJobMetrics();} 进入engineMetricsExtractor.getAllRunningJobMetrics();   Overridepublic MapLong, HashMapInteger, JobMetrics getAllRunningJobMetrics() {HashMapLong, HashMapInteger, JobMetrics allRunningJobMetricsHashMap new HashMap();try { // 是不是很熟悉。seatunnelproxy一看就是从这里开始真正和Hazelcast交互获取数据了String allJobMetricsContent seaTunnelEngineProxy.getAllRunningJobMetricsContent();if (StringUtils.isEmpty(allJobMetricsContent)) {return new HashMap();}JsonNode jsonNode JsonUtils.stringToJsonNode(allJobMetricsContent);IteratorJsonNode iterator jsonNode.iterator();while (iterator.hasNext()) {LinkedHashMapInteger, JobMetrics metricsMap new LinkedHashMap();JsonNode next iterator.next();JsonNode sourceReceivedCount next.get(metrics).get(SourceReceivedCount);Long jobEngineId 0L;if (sourceReceivedCount ! null sourceReceivedCount.isArray()) {for (JsonNode node : sourceReceivedCount) {jobEngineId node.get(tags).get(jobId).asLong();Integer pipelineId node.get(tags).get(pipelineId).asInt();JobMetrics currPipelineMetrics getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setReadRowCount(currPipelineMetrics.getReadRowCount() node.get(value).asLong());}}JsonNode sinkWriteCount next.get(metrics).get(SinkWriteCount);if (sinkWriteCount ! null sinkWriteCount.isArray()) {for (JsonNode node : sinkWriteCount) {jobEngineId node.get(tags).get(jobId).asLong();Integer pipelineId node.get(tags).get(pipelineId).asInt();JobMetrics currPipelineMetrics getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setWriteRowCount(currPipelineMetrics.getWriteRowCount() node.get(value).asLong());}}JsonNode sinkWriteQPS next.get(metrics).get(SinkWriteQPS);if (sinkWriteQPS ! null sinkWriteQPS.isArray()) {for (JsonNode node : sinkWriteQPS) {Integer pipelineId node.get(tags).get(pipelineId).asInt();JobMetrics currPipelineMetrics getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setWriteQps(currPipelineMetrics.getWriteQps() (new Double(node.get(value).asDouble())).longValue());}}JsonNode sourceReceivedQPS next.get(metrics).get(SourceReceivedQPS);if (sourceReceivedQPS ! null sourceReceivedQPS.isArray()) {for (JsonNode node : sourceReceivedQPS) {Integer pipelineId node.get(tags).get(pipelineId).asInt();JobMetrics currPipelineMetrics getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setReadQps(currPipelineMetrics.getReadQps() (new Double(node.get(value).asDouble())).longValue());}}JsonNode cdcRecordEmitDelay next.get(metrics).get(CDCRecordEmitDelay);if (cdcRecordEmitDelay ! null cdcRecordEmitDelay.isArray()) {MapInteger, ListLong dataMap new HashMap();for (JsonNode node : cdcRecordEmitDelay) {Integer pipelineId node.get(tags).get(pipelineId).asInt();long value node.get(value).asLong();dataMap.computeIfAbsent(pipelineId, n - new ArrayList()).add(value);}dataMap.forEach((key, value) - {JobMetrics currPipelineMetrics getOrCreatePipelineMetricsMapStatusRunning(metricsMap, key);OptionalDouble average value.stream().mapToDouble(a - a).average();currPipelineMetrics.setRecordDelay(Double.valueOf(average.isPresent()? average.getAsDouble(): 0).longValue());});}log.info(jobEngineId{},metricsMap{}, jobEngineId, metricsMap);allRunningJobMetricsHashMap.put(jobEngineId, metricsMap);}} catch (Exception e) {e.printStackTrace();}return allRunningJobMetricsHashMap;} 到这里如果有实际操作过seatunnel-web界面的同学们肯定知道这个基本就已经触及监控数据的来源了。进入seaTunnelEngineProxy.getAllRunningJobMetricsContent();   public String getAllRunningJobMetricsContent() {SeaTunnelClient seaTunnelClient new SeaTunnelClient(clientConfig);try {return seaTunnelClient.getJobClient().getRunningJobMetrics();} finally {seaTunnelClient.close();}} 代码很简单没啥说的继续跟踪   public String getRunningJobMetrics() {return (String)this.hazelcastClient.requestOnMasterAndDecodeResponse(SeaTunnelGetRunningJobMetricsCodec.encodeRequest(), SeaTunnelGetRunningJobMetricsCodec::decodeResponse);} hazelcastClient是不是眼熟。是的seatunnel对hazelcast的调用封装了很深。马上就胜利了继续跟代码   public S S requestOnMasterAndDecodeResponse(NonNull ClientMessage request, NonNull FunctionClientMessage, Object decoder) {if (request null) {throw new NullPointerException(request is marked non-null but is null);} else if (decoder null) {throw new NullPointerException(decoder is marked non-null but is null);} else {UUID masterUuid this.hazelcastClient.getClientClusterService().getMasterMember().getUuid();return this.requestAndDecodeResponse(masterUuid, request, decoder);}} 获取到我们要从那个hazelcast节点获取数据的信息然后去调用   public S S requestAndDecodeResponse(NonNull UUID uuid, NonNull ClientMessage request, NonNull FunctionClientMessage, Object decoder) {if (uuid null) {throw new NullPointerException(uuid is marked non-null but is null);} else if (request null) {throw new NullPointerException(request is marked non-null but is null);} else if (decoder null) {throw new NullPointerException(decoder is marked non-null but is null);} else {ClientInvocation invocation new ClientInvocation(this.hazelcastClient, request, (Object)null, uuid);try {ClientMessage response (ClientMessage)invocation.invoke().get();return this.serializationService.toObject(decoder.apply(response));} catch (InterruptedException var6) {Thread.currentThread().interrupt();return null;} catch (Throwable var7) {throw ExceptionUtil.rethrow(var7);}}} 着重记忆一下ClientInvocation和ClientMessage。因为在跟踪hazelcase-api的代码的时候就是用的这里。在下边就是调用hazelcast的客户端发送请求然后get阻塞直到数据返回。 2、Hazelcast-api hazelcast的api调用我们以下面这段代码为入口开始看源码。 import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; Component
public class HazelcastService { Autowired private HazelcastInstance hazelcastInstance; public void putData() { IMapString, String map hazelcastInstance.getMap(my-map); map.put(key1, value1); } public String getData(String key) { IMapString, String map hazelcastInstance.getMap(my-map); return map.get(key); }
} 可以看到hazelcast的使用基本和java的数据结构使用一样。所以如果我们要使用hazelcast还是很方便入手的。进入hazelcast封装的map的put方法   Overridepublic V get(Nonnull Object key) {checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);return toObject(getInternal(key));} 进入getInternal方法   protected Object getInternal(Object key) {// TODO: action for read-backup true is not well testedData keyData toDataWithStrategy(key);if (mapConfig.isReadBackupData()) {Object fromBackup readBackupDataOrNull(keyData);if (fromBackup ! null) {return fromBackup;}}MapOperation operation operationProvider.createGetOperation(name, keyData);operation.setThreadId(getThreadId());return invokeOperation(keyData, operation);} 将参数封装为了hazelcast的map数据结构并调用操作方法   private Object invokeOperation(Data key, MapOperation operation) {int partitionId partitionService.getPartitionId(key);operation.setThreadId(getThreadId());try {Object result;if (statisticsEnabled) {long startTimeNanos Timer.nanos();Future future operationService.createInvocationBuilder(SERVICE_NAME, operation, partitionId).setResultDeserialized(false).invoke();result future.get();incrementOperationStats(operation, localMapStats, startTimeNanos);} else {Future future operationService.createInvocationBuilder(SERVICE_NAME, operation, partitionId).setResultDeserialized(false).invoke();result future.get();}return result;} catch (Throwable t) {throw rethrow(t);}} 执行方法并返回了一个InvocationFuture这个InvocationFuture对象是集成了CompletableFuture的一个future所以如果需要也可以使用多线程编排执行复杂查询的。   Overridepublic InvocationFuture invoke() {op.setServiceName(serviceName);Invocation invocation;if (target null) {op.setPartitionId(partitionId).setReplicaIndex(replicaIndex);invocation new PartitionInvocation(context, op, doneCallback, tryCount, tryPauseMillis, callTimeout, resultDeserialized,failOnIndeterminateOperationState, connectionManager);} else {invocation new TargetInvocation(context, op, target, doneCallback, tryCount, tryPauseMillis,callTimeout, resultDeserialized, connectionManager);}return async? invocation.invokeAsync(): invocation.invoke();} 可以看到真正去执行的是不同类型的Invocation。并且可以根据是同步还是异步调用不同的执行方法我们直接看invoke方法。   private void invoke0(boolean isAsync) {if (invokeCount 0) {throw new IllegalStateException(This invocation is already in progress);} else if (isActive()) {throw new IllegalStateException(Attempt to reuse the same operation in multiple invocations. Operation is op);}try {setCallTimeout(op, callTimeoutMillis);setCallerAddress(op, context.thisAddress);op.setNodeEngine(context.nodeEngine);boolean isAllowed context.operationExecutor.isInvocationAllowed(op, isAsync);if (!isAllowed !isMigrationOperation(op)) {throw new IllegalThreadStateException(Thread.currentThread() cannot make remote call: op);}doInvoke(isAsync);} catch (Exception e) {handleInvocationException(e);}} 继续进入doInvoke方法   private void doInvoke(boolean isAsync) {if (!engineActive()) {return;}invokeCount;setInvocationTime(op, context.clusterClock.getClusterTime());// Well initialize the invocation before registering it. Invocation monitor iterates over// registered invocations and it must observe completely initialized invocations.Exception initializationFailure null;try {initInvocationTarget();} catch (Exception e) {// Well keep initialization failure and notify invocation with this failure// after invocation is registered to the invocation registry.initializationFailure e;}if (!context.invocationRegistry.register(this)) {return;}if (initializationFailure ! null) {notifyError(initializationFailure);return;}if (isLocal()) {doInvokeLocal(isAsync);} else {doInvokeRemote();}} 如果是本地调用进入doInvokeLocal。如果是远程调用进入doInvokeRemote。如果是springboot直接引入的情况下进入本地调用调用远程的hazelcast集群的。进入doInvokeRemote方法。例子中是本地调用所以进入doInvokeLocal这里的代码本文就不继续跟进去如果感兴趣可以debug进去看看大概的逻辑是调用execute方法然后将MapOperationOperation对象放到一个队列中线程池异步执行我们着重看下MapOperation。   public abstract class MapOperation extends AbstractNamedOperationimplements IdentifiedDataSerializable, ServiceNamespaceAware {private static final boolean ASSERTION_ENABLED MapOperation.class.desiredAssertionStatus();protected transient MapService mapService;protected transient RecordStoreRecord recordStore;protected transient MapContainer mapContainer;protected transient MapServiceContext mapServiceContext;protected transient MapEventPublisher mapEventPublisher;protected transient boolean createRecordStoreOnDemand true;protected transient boolean disposeDeferredBlocks true;private transient boolean canPublishWanEvent;public MapOperation() {}public MapOperation(String name) {this.name name;}Overridepublic final void beforeRun() throws Exception {super.beforeRun();mapService getService();mapServiceContext mapService.getMapServiceContext();mapEventPublisher mapServiceContext.getMapEventPublisher();try {recordStore getRecordStoreOrNull();if (recordStore null) {mapContainer mapServiceContext.getMapContainer(name);} else {mapContainer recordStore.getMapContainer();}} catch (Throwable t) {disposeDeferredBlocks();throw rethrow(t, Exception.class);}canPublishWanEvent canPublishWanEvent(mapContainer);assertNativeMapOnPartitionThread();innerBeforeRun();}protected void innerBeforeRun() throws Exception {if (recordStore ! null) {recordStore.beforeOperation();}// Concrete classes can override this method.}Overridepublic final void run() {try {runInternal();} catch (NativeOutOfMemoryError e) {rerunWithForcedEviction();}}protected void runInternal() {// Intentionally empty method body.// Concrete classes can override this method.}private void rerunWithForcedEviction() {try {runWithForcedEvictionStrategies(this);} catch (NativeOutOfMemoryError e) {disposeDeferredBlocks();throw e;}}Overridepublic final void afterRun() throws Exception {afterRunInternal();disposeDeferredBlocks();super.afterRun();}protected void afterRunInternal() {// Intentionally empty method body.// Concrete classes can override this method.}Overridepublic void afterRunFinal() {if (recordStore ! null) {recordStore.afterOperation();}}protected void assertNativeMapOnPartitionThread() {if (!ASSERTION_ENABLED) {return;}assert mapContainer.getMapConfig().getInMemoryFormat() ! NATIVE|| getPartitionId() ! GENERIC_PARTITION_ID: Native memory backed map operations are not allowed to run on GENERIC_PARTITION_ID;}ILogger logger() {return getLogger();}protected final CallerProvenance getCallerProvenance() {return disableWanReplicationEvent() ? CallerProvenance.WAN : CallerProvenance.NOT_WAN;}private RecordStore getRecordStoreOrNull() {int partitionId getPartitionId();if (partitionId -1) {return null;}PartitionContainer partitionContainer mapServiceContext.getPartitionContainer(partitionId);if (createRecordStoreOnDemand) {return partitionContainer.getRecordStore(name);} else {return partitionContainer.getExistingRecordStore(name);}}Overridepublic void onExecutionFailure(Throwable e) {disposeDeferredBlocks();super.onExecutionFailure(e);}Overridepublic void logError(Throwable e) {ILogger logger getLogger();if (e instanceof NativeOutOfMemoryError) {Level level this instanceof BackupOperation ? Level.FINEST : Level.WARNING;logger.log(level, Cannot complete operation! - e.getMessage());} else {// we need to introduce a proper method to handle operation failures (at the moment// this is the only place where we can dispose native memory allocations on failure)disposeDeferredBlocks();super.logError(e);}}void disposeDeferredBlocks() {if (!disposeDeferredBlocks|| recordStore null|| recordStore.getInMemoryFormat() ! NATIVE) {return;}recordStore.disposeDeferredBlocks();}private boolean canPublishWanEvent(MapContainer mapContainer) {boolean canPublishWanEvent mapContainer.isWanReplicationEnabled() !disableWanReplicationEvent();if (canPublishWanEvent) {mapContainer.getWanReplicationDelegate().doPrepublicationChecks();}return canPublishWanEvent;}Overridepublic String getServiceName() {return MapService.SERVICE_NAME;}public boolean isPostProcessing(RecordStore recordStore) {MapDataStore mapDataStore recordStore.getMapDataStore();return mapDataStore.isPostProcessingMapStore()|| !mapContainer.getInterceptorRegistry().getInterceptors().isEmpty();}public void setThreadId(long threadId) {throw new UnsupportedOperationException();}public long getThreadId() {throw new UnsupportedOperationException();}protected final void invalidateNearCache(ListData keys) {if (!mapContainer.hasInvalidationListener() || isEmpty(keys)) {return;}Invalidator invalidator getNearCacheInvalidator();for (Data key : keys) {invalidator.invalidateKey(key, name, getCallerUuid());}}// TODO: improve here its possible that client cannot manage to attach listenerpublic final void invalidateNearCache(Data key) {if (!mapContainer.hasInvalidationListener() || key null) {return;}Invalidator invalidator getNearCacheInvalidator();invalidator.invalidateKey(key, name, getCallerUuid());}/*** This method helps to add clearing Near Cache event only from* one-partition which matches partitionId of the map name.*/protected final void invalidateAllKeysInNearCaches() {if (mapContainer.hasInvalidationListener()) {int partitionId getPartitionId();Invalidator invalidator getNearCacheInvalidator();if (partitionId getNodeEngine().getPartitionService().getPartitionId(name)) {invalidator.invalidateAllKeys(name, getCallerUuid());} else {invalidator.forceIncrementSequence(name, getPartitionId());}}}private Invalidator getNearCacheInvalidator() {MapNearCacheManager mapNearCacheManager mapServiceContext.getMapNearCacheManager();return mapNearCacheManager.getInvalidator();}protected final void evict(Data justAddedKey) {if (mapContainer.getEvictor() Evictor.NULL_EVICTOR) {return;}recordStore.evictEntries(justAddedKey);disposeDeferredBlocks();}Overridepublic int getFactoryId() {return MapDataSerializerHook.F_ID;}Overridepublic ObjectNamespace getServiceNamespace() {MapContainer container mapContainer;if (container null) {MapService service getService();container service.getMapServiceContext().getMapContainer(name);}return container.getObjectNamespace();}// for testing onlypublic void setMapService(MapService mapService) {this.mapService mapService;}// for testing onlypublic void setMapContainer(MapContainer mapContainer) {this.mapContainer mapContainer;}protected final void publishWanUpdate(Data dataKey, Object value) {publishWanUpdateInternal(dataKey, value, false);}private void publishWanUpdateInternal(Data dataKey, Object value, boolean hasLoadProvenance) {if (!canPublishWanEvent) {return;}RecordObject record recordStore.getRecord(dataKey);if (record null) {return;}Data dataValue toHeapData(mapServiceContext.toData(value));ExpiryMetadata expiryMetadata recordStore.getExpirySystem().getExpiryMetadata(dataKey);WanMapEntryViewObject, Object entryView createWanEntryView(toHeapData(dataKey), dataValue, record, expiryMetadata,getNodeEngine().getSerializationService());mapEventPublisher.publishWanUpdate(name, entryView, hasLoadProvenance);}protected final void publishLoadAsWanUpdate(Data dataKey, Object value) {publishWanUpdateInternal(dataKey, value, true);}protected final void publishWanRemove(Nonnull Data dataKey) {if (!canPublishWanEvent) {return;}mapEventPublisher.publishWanRemove(name, toHeapData(dataKey));}protected boolean disableWanReplicationEvent() {return false;}protected final TxnReservedCapacityCounter wbqCapacityCounter() {return recordStore.getMapDataStore().getTxnReservedCapacityCounter();}protected final Data getValueOrPostProcessedValue(Record record, Data dataValue) {if (!isPostProcessing(recordStore)) {return dataValue;}return mapServiceContext.toData(record.getValue());}Overridepublic TenantControl getTenantControl() {return getNodeEngine().getTenantControlService().getTenantControl(MapService.SERVICE_NAME, name);}Overridepublic boolean requiresTenantContext() {return true;} } 既然要线程异步去执行所以它肯定要实现run方法所以找到run方法进入runInternal。实现方法很多找到map包相关的类。   Overrideprotected void runInternal() {Object currentValue recordStore.get(dataKey, false, getCallerAddress());if (noCopyReadAllowed(currentValue)) {// in case of a remote call (e.g a client call) we prevent making// an on-heap copy of the off-heap dataresult (Data) currentValue;} else {// in case of a local call, we do make a copy, so we can safely share// it with e.g. near cache invalidationresult mapService.getMapServiceContext().toData(currentValue);}} 这里基本就是获取到hazelcast管理的内存中数据的地方不再一一debug一路向下找到代码   public V get(Object key) {int hash hashOf(key);return segmentFor(hash).get(key, hash);} 怎么样熟悉吧。java的map调用是不是也是这样先hash找到位置在获取数据。其实这里的hash和map的hash有一些区别。这是由于hazelcast的架构决定的如果对原理架构感兴趣可以百度搜一搜很多。这里大概提一嘴有一个分片的概念put的时候会hash到不同的分区分片。这也是hazelcast分布式的原理。 7、结语 本文只是介绍了hazelcast的最基本用法如果按照案例中的使用完全可以用redis或者本地缓存。但是如果有了更高级实际中的使用那么hazelcast的分布式计算特性还是很好用的。源码也只是分析了本地的调用。如果感兴趣其实可以debug跟进去看下远程调用的方式。其实想想本质还是一样远程调用就需要1、发现节点2、注册节点3、网络调用其他节点。而seatunnel的调用就相对来说更高级一些它进行了一系列的封装。最后也还是网络调用其他节点。然后返回future阻塞等待返回结果由于是内存级别的处理特别快。 对了差点忘记一点一直在说分布式特性。本文只说了单纯作为缓存使用get、put方法。这里大概介绍下分布式api的使用 IExecutorService executorService hazelcastInstance.getExecutorService(myExecutor);
Runnable task () - { // 这里是任务的逻辑 System.out.println(Executing task on hazelcastInstance.getCluster().getLocalMember().getAddress());
};
FutureVoid future executorService.submit(task);
future.get(); // 等待任务完成 这样就可以查询分布式节点上的数据然后聚合返回。是不是有点像MapReduce。确实hazelcast也可以使用MapReduce进行复杂运算想了解的也可以去搜一搜看看。