Skip to content

Commit fc37f70

Browse files
committed
complete the main functions.
1 parent 050a024 commit fc37f70

45 files changed

Lines changed: 1244 additions & 100 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.idea/workspace.xml

Lines changed: 62 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,22 @@
1-
# ObjectStorage
2-
分布式对象存储项目开发
1+
# ObjectStorage 分布式对象存储项目开发
2+
3+
## 简介
4+
项目主要分为apiServer端和dataServer端,apiServer向外提供restful接口,负责接受客户的连接请求。
5+
dataServer负责存储数据。rabbitmq用于apiServer和dataServer端之间心跳、文件定位等。Elasticearch用于存储元数据。
6+
7+
## 主要特点
8+
### 数据校验和去重
9+
主要思想,计算文件的sha256哈希值,以哈希值代替文件名进行存储,元数据服务器存储<filename,hash>的映射。客户上传数据时会将哈希作为http头部数据一起上传,服务器接受时会计算哈希,并对比。同时
10+
由于哈希的唯一性,重复数据只保留一份。
11+
### 数据冗余和即时修复
12+
用Reed Solomon纠删码进行冗余,每份数据分成若干份,每份存储在互不相同的结点上。读取的时候进行文件完整性检查和修复。
13+
修复
14+
15+
### 断点续传
16+
支持断点下载和上传。下载时http头部包含偏移量。上传为了支持哈希校验,在上传完成后才开始校验。
17+
18+
### gzip压缩
19+
节省空间,数据服务器存储存储压缩数据,下载时解压之后再发送出去。
20+
21+
## 改进
22+
加入Nginx进行负载均衡

apiServer/apiServer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"ObjectStorage/apiServer/heartbeat"
55
"ObjectStorage/apiServer/locate"
66
"ObjectStorage/apiServer/object"
7+
"ObjectStorage/apiServer/temp"
78
"ObjectStorage/apiServer/versions"
89
"log"
910
"net/http"
@@ -18,6 +19,8 @@ func main() {
1819
http.HandleFunc("/locate/", locate.Handler)
1920
//Get版本所有信息,直接查询元数据服务器
2021
http.HandleFunc("/versions/", versions.Handler)
22+
//
23+
http.HandleFunc("/temp/", temp.Handler)
2124

2225
log.Fatalln(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
2326
}

apiServer/heartbeat/choose.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,29 @@ package heartbeat
22

33
import "math/rand"
44

5-
func ChooseRandomDataServer() string {
5+
func ChooseRandomDataServers(n int, exclude map[int]string) (ds []string) {
6+
candidates := make([]string, 0)
7+
//exclude the server which has the data
8+
reverseExcludeServers := make(map[string]int)
9+
for id, addr := range exclude {
10+
reverseExcludeServers[addr] = id
11+
}
12+
//get current data servers that online
613
servers := GetDataServers()
7-
if len(servers) == 0 {
8-
return ""
14+
for _, s := range servers {
15+
//judge the server exclude or include
16+
_, excluded := reverseExcludeServers[s]
17+
if !excluded {
18+
candidates = append(candidates, s)
19+
}
20+
}
21+
length := len(candidates)
22+
if length < n {
23+
return
24+
}
25+
p := rand.Perm(length)
26+
for i := 0; i < n; i++ {
27+
ds = append(ds, candidates[p[i]])
928
}
10-
return servers[rand.Intn(len(servers))]
29+
return
1130
}

apiServer/locate/locate.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,35 @@ package locate
22

33
import (
44
"ObjectStorage/src/lib/rabbitmq"
5+
"ObjectStorage/src/lib/rs"
6+
"ObjectStorage/src/lib/types"
7+
"encoding/json"
58
"os"
6-
"strconv"
79
"time"
810
)
911

10-
func Locate(name string) string {
12+
func Locate(name string) map[int]string {
1113
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
1214
q.Publish("dataServers", name)
1315
c := q.Consume()
1416
go func() {
1517
time.Sleep(1 * time.Second)
1618
q.Close()
1719
}()
18-
msg := <-c
19-
s, _ := strconv.Unquote(string(msg.Body))
20-
return s
20+
21+
locateInfo := make(map[int]string)
22+
for i := 0; i < rs.ALL_SHARDS; i++ {
23+
msg := <-c
24+
if len(msg.Body) == 0 {
25+
return locateInfo
26+
}
27+
var info types.LocateMessage
28+
json.Unmarshal(msg.Body, &info)
29+
locateInfo[info.Id] = info.Addr
30+
}
31+
return locateInfo
2132
}
2233

2334
func Exist(name string) bool {
24-
return Locate(name) != ""
35+
return len(Locate(name)) >= rs.DATA_SHARDS
2536
}

apiServer/object/get.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package object
22

33
import (
44
"ObjectStorage/src/lib/es"
5+
"ObjectStorage/src/lib/utils"
6+
"compress/gzip"
7+
"fmt"
58
"io"
69
"log"
710
"net/http"
@@ -11,36 +14,56 @@ import (
1114
)
1215

1316
func get(w http.ResponseWriter, r *http.Request) {
14-
//1.从元数据服务器上获取文件名贺版本号
15-
version := 0
1617
name := strings.Split(r.URL.EscapedPath(), "/")[2]
1718
versionId := r.URL.Query()["version"]
18-
var err error
19+
version := 0
20+
var e error
1921
if len(versionId) != 0 {
20-
version, err = strconv.Atoi(versionId[0])
21-
if err != nil {
22-
log.Println(err)
23-
w.WriteHeader(http.StatusInternalServerError)
22+
version, e = strconv.Atoi(versionId[0])
23+
if e != nil {
24+
log.Println(e)
25+
w.WriteHeader(http.StatusBadRequest)
2426
return
2527
}
2628
}
27-
//2.查看数据是否被删除
28-
meta, err := es.GetMetadata(name, version)
29-
if err != nil {
30-
log.Println(err)
29+
meta, e := es.GetMetadata(name, version)
30+
if e != nil {
31+
log.Println(e)
3132
w.WriteHeader(http.StatusInternalServerError)
3233
return
3334
}
3435
if meta.Hash == "" {
3536
w.WriteHeader(http.StatusNotFound)
3637
return
3738
}
38-
//3.若数据存在则向dataServer发起请求
39-
stream, err := getStream(url.PathEscape(meta.Hash))
40-
if err != nil {
41-
log.Println(err)
39+
hash := url.PathEscape(meta.Hash)
40+
stream, e := GetStream(hash, meta.Size)
41+
if e != nil {
42+
log.Println(e)
4243
w.WriteHeader(http.StatusNotFound)
4344
return
4445
}
45-
io.Copy(w, stream)
46+
offset := utils.GetOffsetFromHeader(r.Header)
47+
if offset != 0 {
48+
stream.Seek(offset, io.SeekCurrent)
49+
w.Header().Set("content-range", fmt.Sprintf("bytes %d-%d/%d", offset, meta.Size-1, meta.Size))
50+
w.WriteHeader(http.StatusPartialContent)
51+
}
52+
acceptGzip := false
53+
encoding := r.Header["Accept-Encoding"]
54+
for i := range encoding {
55+
if encoding[i] == "gzip" {
56+
acceptGzip = true
57+
break
58+
}
59+
}
60+
if acceptGzip {
61+
w.Header().Set("content-encoding", "gzip")
62+
w2 := gzip.NewWriter(w)
63+
io.Copy(w2, stream)
64+
w2.Close()
65+
} else {
66+
io.Copy(w, stream)
67+
}
68+
stream.Close()
4669
}

0 commit comments

Comments
 (0)