成都网站推广经理,网站建设 算什么,衡水安徽学校网站建设,开发网站开发工程师招聘要求用Java实现一个简单的KV数据库 开发思路#xff1a; 用map存储数据#xff0c;再用一个List记录操作日志#xff0c;开一个新线程将List中的操作写入日志文件中#xff0c;再开一个线程用于网络IO服务接收客户端的命令#xff0c;再启动时检查日志#xff0c;如果有数据就…用Java实现一个简单的KV数据库 开发思路 用map存储数据再用一个List记录操作日志开一个新线程将List中的操作写入日志文件中再开一个线程用于网络IO服务接收客户端的命令再启动时检查日志如果有数据就读入map中
关于redis
存储结构 redis redis的数据保存其实比较复杂使用一个哈希表保存所有键值对一个哈希表就是一个数组数组的每一个元素是一个哈希桶哈希桶中保存的是key和value的指针目录再通过指针去找对应的key和value当然对于value是List等数据结构还用到跳表双向列表压缩列表整数数组等数据结构SimpleKVDB 只用了Java的HashMap(偷懒) 线程 redis redis虽然成为单线程但是redis的网络IO和键值对读写是由一个线程但是另外的持久化异步删除集群数据同步等都是额外线程SimpleKVDB 数据读写网络IO一个线程持久化一个线程集群同步本来想做但是后来没有写也是新开一条线程 网络IO redis 单线程多路复用高性能IO模式SimpleKVDB 直接用Java标准库NIO多路复用IO模式 持久化 redis AOF操作日志RDB快照AOF用来记录每一次的操作(增删改)可以实时同步也可以每隔一个时间同步文件中RDB全量数据快照但是需要开一条子进程开销比较大redis4.0以后使用一种新的模式RDB每隔一段时间全量快照内存数据AOF记录每个RDB之间的操作记录当下一次全量RDB以后清空AOF再重新记录操作日志SimpleKVDB 只记录AOF操作日志开一个新线程有新的操作就写入后来我发现可以使用mmap内存映射的方法这样更快效率更高 主从数据一致 redis 选一台主服务器用于写入从服务器用于读取主服务器有数据写入就同步从服务器哨兵机制用于监控所有服务器如果主服务器崩溃就选择一台从服务器作为主服务器(会根据是否下线网络速度读写速度等选择主服务器)然后通知其他从服务器连接到新的主服务器SimpleKVDB 没写设想本来是想写一个配置文件写入主服务器IP其他从服务器IP开一个线程在服务端中写一个客户端当作主服务器读取配置文件只有主服务器才能开这个线程其他从服务器还是开启服务用来接收主服务器的数据同步从数据库的内存和操作日志里
操作展示 客户端 服务端 日志文件
目录结构
SimpleKVDB SimpleKVDBClient(客户端) SimpleKVDBClient.java(客户端) SimpleKVDBService(服务端) AofAnnotation.java (注解)AofInterface.java(接口)DynamicAgent.java(动态代理)SimpleKVDBService.java(服务端)
SimpleKVDBClient.java(客户端):
package SimpleKVDB.SimpleKVDBClient;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleKVDBClient {public static void main(String[] args) throws Exception {SocketChannel socketChannel SocketChannel.open();socketChannel.configureBlocking(false);Selector selector Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);socketChannel.connect(new InetSocketAddress(127.0.0.1,5555));while (true){selector.select();//阻塞 等待事件发生SetSelectionKey selectionKeys selector.selectedKeys();selectionKeys.forEach(key -{try {if (key.isConnectable()){SocketChannel channel (SocketChannel) key.channel();if (channel.isConnectionPending()){//是否正在连接channel.finishConnect(); //结束正在连接ByteBuffer writeBuffer ByteBuffer.allocate(1024);writeBuffer.put((LocalDateTime.now() 连接成功).getBytes());writeBuffer.flip();channel.write(writeBuffer);//将buffer写入channelExecutorService service Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());service.submit(()-{//线程从键盘读入数据try {while (true){writeBuffer.clear();//清空bufferInputStreamReader input new InputStreamReader(System.in);BufferedReader bufferedReader new BufferedReader(input);String senderMessage bufferedReader.readLine();writeBuffer.put(senderMessage.getBytes());writeBuffer.flip();channel.write(writeBuffer);}}catch (Exception e){e.printStackTrace();}});}channel.register(selector,SelectionKey.OP_READ);//注册事件}else if (key.isReadable()){//channel 有信息的输入SocketChannel channel (SocketChannel) key.channel();//哪个channel 触发了 readByteBuffer readBuffer ByteBuffer.allocate(1024);int count channel.read(readBuffer);//server发来的if (count 0){String receiveMessage new String(readBuffer.array(),0,count);System.out.println(响应结果receiveMessage);}}}catch (Exception e){e.printStackTrace();}finally {selectionKeys.clear();//移除已经发生的事件}});}}
}AofAnnotation.java注解
package SimpleKVDB.SimpleKVDBService;import java.lang.annotation.*;// ----------- 自定义的注解用于区分是什么操作其实也可以不用直接获取方法名区分也一样 -----------
// 自定义的注解
Retention(RetentionPolicy.RUNTIME)//注解会在class中存在运行时可通过反射获取
Target(ElementType.METHOD)//目标是方法
Documented
//文档生成时该注解将被包含在javadoc中可去掉
interface AofAnnotation {String name() default ;
}AofInterface.java动态代理接口:
package SimpleKVDB.SimpleKVDBService;// ----------- 动态代理需要的接口主要想实现切面效果在每一个操作后面加一个日志 -----------
// 动态代理需要的接口
// 只需要给增删改上加操作日志保证数据一致性
interface AofInterface {
// AofAnnotation(nameclear)
// int hashClear();AofAnnotation(nameset)Object hashSet(String key, Object value);AofAnnotation(nameremove)Object hashRemove(String key);
}DynamicAgent.java动态代理:
package SimpleKVDB.SimpleKVDBService;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;// ----------- 动态代理实现切面效果的逻辑代码 -----------
// 动态代理
public class DynamicAgentT implements InvocationHandler {// 接口实现类实例如果不使用泛型这里可以直接用ObjectT rent;void setObject(T obj){this.rent obj;}// aof内存ListString listData;public void setListData(ListString list){this.listData list;}// 生成代码类public Object getProxy(){// 第一个参数是代理类的类加载器第二个参数是代理类要实现的接口第三个参数是处理接口方法的程序// 这里代理类是自己所以直接thisgetClass().getClassLoader()是获取加载器// getClass().getInterfaces() 是获取实现类的接口// 因为invoke()就是执行方法所以第三个参数也是本身thisreturn Proxy.newProxyInstance(this.getClass().getClassLoader(), rent.getClass().getInterfaces(),this);}// 处理代理实例并返回执行结果public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 动态代理本质就是通过反射实现这里就是执行这个对象的方法Object result method.invoke(rent, args);// 获取注解AofAnnotation say method.getAnnotation(AofAnnotation.class);// 注解的name内容String name say.name();System.out.println(name::name);// aof日志写入aofSetLog(name, args);return result;}// 给aof开辟一个内存public void aofSetLog(String name, Object[] args){MapString, Object dataMap new HashMapString, Object();// 日志格式String aofData *|;if(set.equals(name)){dataMap.put(args[0].toString(), args[1]);aofData aofData name|args[0].toString()|dataMap.get(args[0].toString());}if(remove.equals(name)){if(null ! dataMap dataMap.size()0){dataMap.remove(args[0].toString());}aofData aofData name|args[0].toString()|;}// 日志内存listData.add(aofData);
// System.out.println(listData:::listData);}// 返回日志数据public ListString getAofDatas(){return listData;}
}SimpleKVDBService.java服务端:
package SimpleKVDB.SimpleKVDBService;import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;// ----------- KV数据库的服务端实现 -----------
public class SimpleKVDBService implements AofInterface {// 全局存储MapString, Object globalMap;public void setGlobalMap(MapString, Object map){this.globalMap map;}// 动态代理对象AofInterface dl;public void setAofInterface(AofInterface i){this.dl i;}// 写入修改操作public Object hashSet(String key, Object value){return globalMap.put(key, value);}// 读取操作public Object hashGet(String key){return globalMap.get(key);}// 删除操作public Object hashRemove(String key){return globalMap.remove(key);}// 获取长度操作public int hashSize(){return globalMap.size();}// 是否为空操作操作public boolean hashIsEmpty(){return globalMap.isEmpty();}// aof日志ListString aofList;// 引用全局aof日志变量用来存储aof操作日志public void setAofList(ListString list){this.aofList list;}// 创建aof文件public File createAofFile(){final String ROOT . File.separator;File newFolder new File(ROOTsimpleKVDB);if(newFolder.exists() newFolder.isDirectory()){System.out.println(文件夹已经存在);}else {boolean isFolder newFolder.mkdir();if(!isFolder){System.out.println(文件夹创建失败);}}// 创建一个文件File newFile new File(newFolder.getPath(),aofDatas.aof);if(newFile.exists() newFile.isFile()){System.out.println(文件已经存在);}boolean isFile;try {isFile newFile.createNewFile();if(!isFile){System.out.println(文件创建失败);}} catch (IOException e) {e.printStackTrace();}return newFile;}// 开一个线程写aof写入文件public void aofFileThread() {new Thread(()-{System.out.println(aof日志写入线程Thread.currentThread().getName());while (true){this.setAofFile(this.aofList);}}).start();}// aof写入日志文件逻辑将aof操作日志写入文件中持久化public void setAofFile(ListString aofList){if(null ! aofList aofList.size()0){// 休眠一秒再写入不频繁使用IO写入try{Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 为什么文件夹和文件检测放这里每次都要检测是防止文件被误删除File newFile this.createAofFile();// 使用try的话自动回收/关闭资源会自动调用close方法不需要手动关闭// 将需要关闭的资源放在try(xxx; yyy;zzz;)// 流的关闭是有顺序的自己手动关闭很繁琐自动关闭大大降低了难度非常方便try(// 创建一个FileOutputStreamOutput是写入文件的byte数据传输流// FileOutputStream 第二参数是否追加FileOutputStream fos new FileOutputStream(newFile, true);// FileOutputStream是通过byte字节流的OutputStreamWriter是将字节流包装成想要的字符集的字符流写入OutputStreamWriter osw new OutputStreamWriter(fos, StandardCharsets.UTF_8);// 使用PrintWriter可以方便的写入一行字符第二个参数自动清空缓冲区PrintWriter pw new PrintWriter(osw, true);){// 一边遍历一边删除aof操作日志IteratorString iterator aofList.iterator();// 判断是否还有下一个元素while (iterator.hasNext()){// 获取下一个元素String str iterator.next();// println是每段换行写入print是不换行写入// 写入其实是一层一层走的先是写入内容进入PrintWriter中然后再OutputStreamWriter根据编码转成字节byte然后再是FileOutputStream字节流写入文件pw.println(str);// 因为是引用传递所以直接删除元素iterator.remove();}// 清空缓冲区因为数据是先进入缓冲区再写入文件需要在关闭前将缓冲区的数据全部写入文件才算完成这样才能关闭整个流缓存区的作用是一个字节一个字节写入太费事儿所以会等到一定量的字节再一起写入所以会出现一种可能就是缓存区还有少量的字节因为没达到量没有写入所以需要清空一下将里面所有剩余的字节都写入// PrintWriter中设置了自动清空缓冲区
// pw.flush();}catch (IOException e){e.printStackTrace();}}}// socket服务与客户端通讯public void socketServer(AofInterface dl){try {//创建ServerSocketChannel-- ServerSocket// 打开通道ServerSocketChannel serverSocketChannel ServerSocketChannel.open();// 打开 SocketChannel 并连接到端口InetSocketAddress inetSocketAddress new InetSocketAddress(5555);serverSocketChannel.socket().bind(inetSocketAddress);// 配置通道为非阻塞模式serverSocketChannel.configureBlocking(false);//开启selector,并注册accept事件// 获取一个选择器实例Selector selector Selector.open();// 将套接字通过到注册到选择器serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true){// 阻塞等待事件发生selector.select();// 返回已发生的注册事件SetSelectionKey selectionKeys selector.selectedKeys();// 判断事件类型进行相应操作selectionKeys.forEach(key -{final SocketChannel client;try {// 根据key获得channelif (key.isAcceptable()){// 之所以转换ServerSocketChannel因为前面注册的就是这个类ServerSocketChannel serverChannel (ServerSocketChannel) key.channel();// 新的channel 和客户端建立了通道client serverChannel.accept();// 非阻塞client.configureBlocking(false);// 将新的channel和selector绑定client.register(selector,SelectionKey.OP_READ);//是否有数据可读}else if (key.isReadable()){client (SocketChannel) key.channel();ByteBuffer readBuffer ByteBuffer.allocate(1024);int count client.read(readBuffer);if (count0){readBuffer.flip();Charset charset StandardCharsets.UTF_8;String receiveMassage String.valueOf(charset.decode(readBuffer).array());// 显示哪个client发消息System.out.println(client : receiveMassage);// 向客户端返回的信息String serverStr ;// 根据客户端不同的命令执行不同的方法if(Objects.equals(receiveMassage.split( )[0], set)){dl.hashSet(receiveMassage.split( )[1], receiveMassage.split( )[2]);serverStr set OK;}if(Objects.equals(receiveMassage.split( )[0], remove)){dl.hashRemove(receiveMassage.split( )[1]);serverStr remove OK;}if(Objects.equals(receiveMassage.split( )[0], get)){serverStr this.hashGet(receiveMassage.split( )[1]).toString();}if(Objects.equals(receiveMassage.split( )[0], isempty)){serverStr String.valueOf(this.hashIsEmpty());}if(Objects.equals(receiveMassage.split( )[0], size)){serverStr String.valueOf(this.hashSize());}if(receiveMassage.contains(连接成功)){serverStr receiveMassage;}SocketChannel channel (SocketChannel) key.channel();;ByteBuffer writeBuffer ByteBuffer.allocate(1024);//返回客户端数据writeBuffer.put((serverStr).getBytes());writeBuffer.flip();channel.write(writeBuffer);}}// 处理完事件一定要移除//selectionKeys.clear();}catch (Exception e){e.printStackTrace();}finally {// 处理完事件一定要移除selectionKeys.clear();}});}}catch (IOException e){e.printStackTrace();}}// socket服务线程public void socketThread(){new Thread(()-{System.out.println(socketServer线程Thread.currentThread().getName());this.socketServer(this.dl);}).start();}// 启动时检查持久化aof日志文件public void setAofToMap(){System.out.println(开始从AOF中恢复数据);File readFile this.createAofFile();// 使用try的话自动回收/关闭资源会自动调用close方法不需要手动关闭// 将需要关闭的资源放在try(xxx; yyy;zzz;)// 流的关闭是有顺序的自己手动关闭很繁琐自动关闭大大降低了难度非常方便try(// 创建一个FileInputStreamInput是写入文件的byte数据传输流FileInputStream fis new FileInputStream(readFile);// FileInputStream是通过byte字节流的InputStreamReader是将字节流包装成想要的字符集的字符流写入InputStreamReader isr new InputStreamReader(fis, StandardCharsets.UTF_8);// 使用BufferedReader增加缓存可以方便的写入一行字符BufferedReader reader new BufferedReader(isr);){// reader.lines().map(String::trim).forEach(System.out::println); 这是一种lambda写法效果和下面一样String str;// 为什么要放在while的条件里面赋值呢是因为readLine()一行一行读取如果到文件结尾了会返回一个null如果放在while的代码体里赋值就需要多一步null的判断// 读取和写入正好相反是先从文件读取内容到缓存区然后从缓存区读出来while ((str reader.readLine()) ! null){String methodStr str.split(\\|)[1];String keyStr str.split(\\|)[2];// 根据不同指令操作不同方法if(set.equals(methodStr)){Object valueStr str.split(\\|)[3];this.hashSet(keyStr, valueStr);}if(remove.equals(methodStr)){this.hashRemove(keyStr);}}System.out.println(AOF中恢复数据结束);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {System.out.println(主线程: Thread.currentThread().getName());// 全局内存MapString, Object maps new HashMap();// 全局aof日志内存ListString lists new ArrayList();// 服务主体类SimpleKVDBService sKvService new SimpleKVDBService();// 全局存储内存sKvService.setGlobalMap(maps);// 动态代理主要是用于给操作添加日志DynamicAgentAofInterface nd new DynamicAgentAofInterface();// 全局aof内存nd.setListData(lists);nd.setObject(sKvService);// 获取代理对象AofInterface dl (AofInterface) nd.getProxy();// 启动时检查aof文件是否存在sKvService.setAofToMap();// 服务主体获取已经有日志信息的aof日志信息sKvService.setAofList(nd.getAofDatas());// 引用动态代理sKvService.setAofInterface(dl);// 子线程写aof写入文件sKvService.aofFileThread();// 子线程socket服务线程sKvService.socketThread(); System.out.println(sKvService.globalMap);System.out.println(22222:nd.getAofDatas());System.out.println(list:sKvService.aofList);System.out.println(333333:sKvService.globalMap);}
}