Commit e2f3413d by XSwang

init commit

parents
HELP.md
/target/
.mvn/
mvnw.cmd
mvnw
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.huaching.k8s</groupId>
<artifactId>ths-k8s-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ths-k8s-client</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>4.4.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.huaching.k8s;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ThsK8sClientApplication {
public static void main(String[] args) {
SpringApplication.run(ThsK8sClientApplication.class, args);
}
}
package com.huaching.k8s.config;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KubectlConfig {
@Bean
public KubernetesClient kubernetesClient(){
Config config = new ConfigBuilder().withMasterUrl("https://47.110.20.92:6443").build();
return new DefaultKubernetesClient(config);
}
}
package com.huaching.k8s.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.config.CorsRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {
/**
* 全局跨域配置,根据各自需求定义
* @param registry
*/
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowCredentials(true)
.allowedOrigins("*")
.allowedHeaders("*")
.allowedMethods("*")
.exposedHeaders(HttpHeaders.SET_COOKIE);
}
}
package com.huaching.k8s.controller;
import com.huaching.k8s.res.ResponseBO;
import com.huaching.k8s.service.KubectlService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/kubectl")
public class KubectlController {
private static final Logger logger = LoggerFactory.getLogger(KubectlController.class);
@Autowired
private KubectlService kubectlService;
@PostMapping("/namespace")
public Mono<ResponseBO> namespace() {
return Mono.create(monoSink -> monoSink.success(kubectlService.getNamespaceList()));
}
@PostMapping("/pods")
public Mono<ResponseBO> pods(@RequestParam String namespace, String podName) {
return Mono.create(monoSink -> monoSink.success(kubectlService.getPodList(namespace, podName)));
}
@PostMapping("/pod")
public Mono<ResponseBO> pod(@RequestParam String namespace, @RequestParam String podName) {
return Mono.create(monoSink -> monoSink.success(kubectlService.getPod(namespace, podName)));
}
@PostMapping("/pod/logs")
public Mono<ResponseBO> podLogs(@RequestParam String namespace, @RequestParam String podName) {
return Mono.create(monoSink -> monoSink.success(kubectlService.getPodLogs(namespace, podName)));
}
}
package com.huaching.k8s.pojo;
public class test {
}
package com.huaching.k8s.pojo.vo;
public class PodVO {
private String podName;
private String namespace;
private Boolean ready;
private String status;
private Integer restarts;
private String startTime;
private String age;
public String getPodName() {
return podName;
}
public void setPodName(String podName) {
this.podName = podName;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public Boolean getReady() {
return ready;
}
public void setReady(Boolean ready) {
this.ready = ready;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public Integer getRestarts() {
return restarts;
}
public void setRestarts(Integer restarts) {
this.restarts = restarts;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
}
package com.huaching.k8s.res;
import java.io.Serializable;
/**
* http 响应数据结构
* return ResponseBO.Builder.init().setData(pageDataListVo).build();
* @author xiajl
* @date 2017/6/6.
*/
public class ResponseBO<T> implements Serializable {
public static final int ERROR = 0;
public static final int SUCCESS = 1;
private Integer completeCode;
private String reasonCode;
private String reasonMessage;
private T data;
/**
* 外部不要直接new对象
*/
private ResponseBO(Builder<T> builder) {
this.reasonCode = builder.reasonCode;
this.reasonMessage = builder.reasonMessage;
this.completeCode = builder.completeCode;
this.data = builder.data;
}
@Deprecated
public ResponseBO(){
}
public Integer getCompleteCode() {
return completeCode;
}
public String getReasonCode() {
return reasonCode;
}
public String getReasonMessage() {
return reasonMessage;
}
public T getData() {
return data;
}
@Deprecated
public ResponseBO<T> setCompleteCode(Integer completeCode) {
this.completeCode = completeCode;
return this;
}
@Deprecated
public ResponseBO<T> setReasonCode(String reasonCode) {
this.reasonCode = reasonCode;
return this;
}
@Deprecated
public ResponseBO<T> setReasonMessage(String reasonMessage) {
this.reasonMessage = reasonMessage;
return this;
}
@Deprecated
public ResponseBO<T> setData(T data) {
this.data = data;
return this;
}
@Override
public String toString() {
return "ResponseBO{" +
"completeCode=" + completeCode +
", reasonCode='" + reasonCode + '\'' +
", reasonMessage='" + reasonMessage + '\'' +
", data=" + (data == null ? "null" : data.toString()) +
'}';
}
public static ResponseBO responseFail(String s) {
return ResponseBO.Builder.init().setFailMessage(s).build();
}
public static ResponseBO responseOK() {
return ResponseBO.Builder.init().build();
}
public static class Builder<K>{
private Integer completeCode;
private String reasonCode;
private String reasonMessage;
private K data;
private Builder(int completeCode){
this.completeCode = completeCode;
}
static public <K> Builder<K> init() { return new Builder<>(SUCCESS); }
public Builder<K> setCompleteCode(int code) {
this.completeCode = code;
return this;
}
public Builder<K> setReasonCode(String code) {
this.reasonCode = code;
return this;
}
public Builder<K> setReasonMessage(String msg) {
this.reasonMessage = msg;
return this;
}
public Builder<K> setFailMessage(String msg) {
this.reasonMessage = msg;
this.completeCode = ERROR;
return this;
}
public Builder<K> setData(K k) {
this.data = k;
return this;
}
public ResponseBO<K> build() { return new ResponseBO<>(this); }
}
}
package com.huaching.k8s.service;
import com.huaching.k8s.res.ResponseBO;
public interface KubectlService {
ResponseBO getNamespaceList();
ResponseBO getPodList(String namespace, String podName);
ResponseBO getPod(String namespace, String podName);
ResponseBO getPodLogs(String namespace, String podName);
ResponseBO deletePod(String namespace, String podName);
}
package com.huaching.k8s.service.impl;
import com.huaching.k8s.pojo.vo.PodVO;
import com.huaching.k8s.res.ResponseBO;
import com.huaching.k8s.service.KubectlService;
import com.huaching.k8s.util.DateDistanceUtil;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Service
public class KubectlServiceImpl implements KubectlService {
@Autowired
private KubernetesClient kubernetesClient;
@Override
public ResponseBO getNamespaceList() {
return ResponseBO.Builder.init().setData(kubernetesClient.namespaces().list()).build();
}
@Override
public ResponseBO getPodList(String namespace, String podName) {
PodList podList = kubernetesClient.pods().inNamespace(namespace).list();
List<PodVO> podVOList = new ArrayList<>();
podList.getItems().forEach(pod -> {
ObjectMeta metadata = pod.getMetadata();
PodVO podVO = new PodVO();
podVO.setNamespace(metadata.getNamespace());
podVO.setPodName(metadata.getName());
List<ContainerStatus> containerStatusList = pod.getStatus().getContainerStatuses();
if (containerStatusList.size() >= 1){
ContainerStatus containerStatus = containerStatusList.get(0);
podVO.setReady(Optional.ofNullable(containerStatus.getReady()).orElse(false));
podVO.setRestarts(Optional.ofNullable(containerStatus.getRestartCount()).orElse(0));
}
PodStatus podStatus = pod.getStatus();
podVO.setStatus(Optional.ofNullable(podStatus.getPhase()).orElse(""));
String startTime = Optional.ofNullable(podStatus.getStartTime().replaceAll("T"," ")
.replaceAll("Z","")).orElse("");
podVO.setStartTime(startTime);
podVO.setAge(DateDistanceUtil.getDistanceTime(startTime));
podVOList.add(podVO);
});
// podName不等于空,就选出来带有podName的pod
if (podName != null && !podName.isEmpty()){
return ResponseBO.Builder.init().setData(podVOList.stream()
.filter(podVO -> podVO.getPodName().indexOf(podName) > -1).collect(Collectors.toList())).build();
}
return ResponseBO.Builder.init().setData(podVOList).build();
}
@Override
public ResponseBO getPod(String namespace, String podName) {
Pod pod = kubernetesClient.pods().inNamespace(namespace).withName(podName).get();
return ResponseBO.Builder.init().setData(pod).build();
}
@Override
public ResponseBO getPodLogs(String namespace, String podName) {
String logs = kubernetesClient.pods().inNamespace(namespace).withName(podName).getLog();
return ResponseBO.Builder.init().setData(logs).build();
}
@Override
public ResponseBO deletePod(String namespace, String podName) {
kubernetesClient.pods().inNamespace(namespace).withName(podName).delete();
return ResponseBO.Builder.init().build();
}
public static void main(String[] args) {
System.out.println("adsd".contains("asf"));
}
}
package com.huaching.k8s.util;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 时间相距
* @author Ben
* @version 1.0
* @date 2009-10-21 16:38:51
*/
public class DateDistanceUtil {
/**
* 两个时间之间相差距离多少天
* @param one 时间参数 1:
* @param two 时间参数 2:
* @return 相差天数
*/
public static long getDistanceDays(String str1, String str2) throws Exception{
DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
Date one;
Date two;
long days=0;
try {
one = df.parse(str1);
two = df.parse(str2);
long time1 = one.getTime();
long time2 = two.getTime();
long diff ;
if(time1<time2) {
diff = time2 - time1;
} else {
diff = time1 - time2;
}
days = diff / (1000 * 60 * 60 * 24);
} catch (ParseException e) {
e.printStackTrace();
}
return days;
}
/**
* 两个时间相差距离多少天多少小时多少分多少秒
* @param str1 时间参数 1 格式:1990-01-01 12:00:00
* @param str2 时间参数 2 格式:2009-01-01 12:00:00
* @return long[] 返回值为:{天, 时, 分, 秒}
*/
public static long[] getDistanceTimes(String str1, String str2) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date one;
Date two;
long day = 0;
long hour = 0;
long min = 0;
long sec = 0;
try {
one = df.parse(str1);
two = df.parse(str2);
long time1 = one.getTime();
long time2 = two.getTime();
long diff ;
if(time1<time2) {
diff = time2 - time1;
} else {
diff = time1 - time2;
}
day = diff / (24 * 60 * 60 * 1000);
hour = (diff / (60 * 60 * 1000) - day * 24);
min = ((diff / (60 * 1000)) - day * 24 * 60 - hour * 60);
sec = (diff/1000-day*24*60*60-hour*60*60-min*60);
} catch (ParseException e) {
e.printStackTrace();
}
long[] times = {day, hour, min, sec};
return times;
}
/**
* 两个时间相差距离多少天多少小时多少分多少秒
* @param str1 时间参数 1 格式:1990-01-01 12:00:00
* @param str2 时间参数 2 格式:2009-01-01 12:00:00
* @return String 返回值为:xx天xx小时xx分xx秒
*/
public static String getDistanceTime(String str1, String str2) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date one;
Date two;
long day = 0;
long hour = 0;
long min = 0;
long sec = 0;
try {
one = df.parse(str1);
two = df.parse(str2);
long time1 = one.getTime();
long time2 = two.getTime();
long diff ;
if(time1<time2) {
diff = time2 - time1;
} else {
diff = time1 - time2;
}
day = diff / (24 * 60 * 60 * 1000);
hour = (diff / (60 * 60 * 1000) - day * 24);
min = ((diff / (60 * 1000)) - day * 24 * 60 - hour * 60);
sec = (diff/1000-day*24*60*60-hour*60*60-min*60);
} catch (ParseException e) {
e.printStackTrace();
}
return day + "天" + hour + "小时" + min + "分" + sec + "秒";
}
/**
* 两个时间相差距离多少天多少小时多少分多少秒
* @param str1 时间参数 1 格式:1990-01-01 12:00:00
* @return String 返回值为:xx天xx小时xx分xx秒
*/
public static String getDistanceTime(String str1) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date one;
Date two;
long day = 0;
long hour = 0;
long min = 0;
long sec = 0;
try {
one = df.parse(str1);
two = new Date();
long time1 = one.getTime();
long time2 = two.getTime();
long diff ;
if(time1<time2) {
diff = time2 - time1;
} else {
diff = time1 - time2;
}
day = diff / (24 * 60 * 60 * 1000);
hour = (diff / (60 * 60 * 1000) - day * 24);
min = ((diff / (60 * 1000)) - day * 24 * 60 - hour * 60);
sec = (diff/1000-day*24*60*60-hour*60*60-min*60);
} catch (ParseException e) {
e.printStackTrace();
}
return day + "天" + hour + "小时" + min + "分" + sec + "秒";
}
}
\ No newline at end of file
package com.huaching.k8s.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ChannelSupervise {
private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();
public static void addChannel(Channel channel){
GlobalGroup.add(channel);
ChannelMap.put(channel.id().asShortText(),channel.id());
}
public static void removeChannel(Channel channel){
GlobalGroup.remove(channel);
ChannelMap.remove(channel.id().asShortText());
}
public static Channel findChannel(String id){
return GlobalGroup.find(ChannelMap.get(id));
}
public static void send2All(TextWebSocketFrame tws){
GlobalGroup.writeAndFlush(tws);
}
}
//package com.huaching.k8s.websocket;
//
//import io.fabric8.kubernetes.client.KubernetesClient;
//import io.fabric8.kubernetes.client.dsl.LogWatch;
//import org.springframework.beans.BeanUtils;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//import org.springframework.web.reactive.socket.WebSocketHandler;
//import org.springframework.web.reactive.socket.WebSocketSession;
//import reactor.core.publisher.Mono;
//import sun.rmi.runtime.Log;
//
//import java.io.IOException;
//
//@Component
//public class EchoHandler implements WebSocketHandler {
//
// @Autowired
// private KubernetesClient kubernetesClient;
//
// @Override
// public Mono<Void> handle(final WebSocketSession session) {
//
// LogWatch log = kubernetesClient.pods().inNamespace("huaching-dev").withName("gateway-7848b9cd89-vr6dv").tailingLines(10).watchLog();
//
// // 从“管道输入流”读取>1024个字节时,就停止读取
// Mono<Void> send = null;
// int total=0;
// while(true) {
// byte[] buf = new byte[1024];
// try {
// int len = log.getOutput().read(buf);
// total += len;
// // 若读取的字节总数>1024,则退出循环。
// // log.getOutput().close();
// session.textMessage(new String(buf,0,len));
// } catch (IOException e) {
// break;
// }
// }
//
//
// return session.send(
// session.receive()
// .map(msg -> session.textMessage(
// "服务端返回:小明, " + msg.getPayloadAsText())));
// }
//}
\ No newline at end of file
package com.huaching.k8s.websocket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private NioWebSocketHandler nioWebSocketHandler;
@Override
protected void initChannel(SocketChannel ch) {
// 设置log监听器,并且日志级别为debug,方便观察运行流程
ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));
// 设置解码器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
// 聚合器,使用websocket会用到
ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
// 用于大数据的分区传输
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
// 自定义的业务handler
ch.pipeline().addLast("handler", nioWebSocketHandler);
}
}
\ No newline at end of file
package com.huaching.k8s.websocket;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
@Component
@ChannelHandler.Sharable
public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> {
private final Logger logger= LoggerFactory.getLogger(NioWebSocketHandler.class);
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("收到消息:"+msg);
if (msg instanceof FullHttpRequest){
//以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
}else if (msg instanceof WebSocketFrame){
//处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//添加连接
logger.debug("客户端加入连接:"+ctx.channel());
ChannelSupervise.addChannel(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//断开连接
logger.debug("客户端断开连接:"+ctx.channel());
ChannelSupervise.removeChannel(ctx.channel());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
// 判断是否关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.content().retain()));
return;
}
// 本例程仅支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
logger.debug("本例程仅支持文本消息,不支持二进制消息");
throw new UnsupportedOperationException(String.format(
"%s frame types not supported", frame.getClass().getName()));
}
// 返回应答消息
String request = ((TextWebSocketFrame) frame).text();
logger.debug("服务端收到:" + request);
Config config = new ConfigBuilder().withMasterUrl("https://47.110.20.92:6443").build();
KubernetesClient client = new DefaultKubernetesClient(config);
LogWatch log = client.pods().inNamespace("huaching-dev").withName("gateway-7848b9cd89-vr6dv").tailingLines(10).watchLog();
// 从“管道输入流”读取>1024个字节时,就停止读取
int total=0;
while(true) {
byte[] buf = new byte[1024];
try {
int len = log.getOutput().read(buf);
total += len;
// System.out.println(new String(buf,0,len));
// 若读取的字节总数>1024,则退出循环。
// log.getOutput().close();
TextWebSocketFrame tws = new TextWebSocketFrame(new String(buf,0,len));
ctx.channel().writeAndFlush(tws);
} catch (IOException e) {
break;
}
}
//
// TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
// + ctx.channel().id() + ":" + request);
//
//
// // 群发
// ChannelSupervise.send2All(tws);
// 返回【谁发的发给谁】
// ctx.channel().writeAndFlush(tws);
}
/**
* 唯一的一次http请求,用于创建websocket
* */
private void handleHttpRequest(ChannelHandlerContext ctx,
FullHttpRequest req) {
//要求Upgrade为websocket,过滤掉get/Post
if (!req.decoderResult().isSuccess()
|| (!"websocket".equals(req.headers().get("Upgrade")))) {
//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:8081/websocket", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 拒绝不合法的请求,并返回错误信息
* */
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
ChannelFuture f = ctx.channel().writeAndFlush(res);
// 如果是非Keep-Alive,关闭连接
if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
\ No newline at end of file
package com.huaching.k8s.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class NioWebSocketServer {
private final Logger logger= LoggerFactory.getLogger(NioWebSocketServer.class);
@Autowired
private NioWebSocketChannelInitializer nioWebSocketChannelInitializer;
private void init(){
logger.info("正在启动websocket服务器");
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(nioWebSocketChannelInitializer);
Channel channel = bootstrap.bind(8081).sync().channel();
logger.info("webSocket服务器启动成功:"+channel);
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
logger.info("运行出错:"+e);
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
logger.info("websocket服务器已关闭");
}
}
/**
* 开启及服务线程
*/
// @PostConstruct
// public void start() {
// new NioWebSocketServer().init();
// }
public static void main(String[] args) {
new NioWebSocketServer().init();
}
}
//package com.huaching.k8s.websocket;
//
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.core.Ordered;
//import org.springframework.web.reactive.HandlerMapping;
//import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
//import org.springframework.web.reactive.socket.WebSocketHandler;
//import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
//
//import java.util.HashMap;
//import java.util.Map;
//
//@Configuration
//public class WebSocketConfiguration {
//
// @Autowired
// @Bean
// public HandlerMapping webSocketMapping(final EchoHandler echoHandler) {
// final Map<String, WebSocketHandler> map = new HashMap<>();
// map.put("/echo", echoHandler);
//
// final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
// mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
// mapping.setUrlMap(map);
// return mapping;
// }
//
// @Bean
// public WebSocketHandlerAdapter handlerAdapter() {
// return new WebSocketHandlerAdapter();
// }
//}
\ No newline at end of file
package com.huaching.k8s;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ThsK8sClientApplicationTests {
@Test
public void contextLoads() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment