diff --git a/daemon/namenode/namenode.go b/daemon/namenode/namenode.go index a191827..b2f7582 100644 --- a/daemon/namenode/namenode.go +++ b/daemon/namenode/namenode.go @@ -1,9 +1,12 @@ package namenode import ( + "bufio" + "io/ioutil" "log" "net" "net/rpc" + "os" "strconv" "time" @@ -12,6 +15,8 @@ import ( "github.com/rounakdatta/GoDFS/util" ) +const NameNodeState = "NN.STATE" + func removeElementFromSlice(elements []string, index int) []string { return append(elements[:index], elements[index+1:]...) } @@ -61,7 +66,7 @@ func discoverDataNodes(nameNodeInstance *namenode.Service, listOfDataNodes *[]st return nil } -func InitializeNameNodeUtil(serverPort int, blockSize int, replicationFactor int, listOfDataNodes []string) { +func InitializeNameNodeUtil(serverPort int, blockSize int, replicationFactor int, isPrimary bool, listOfDataNodes []string) { nameNodeInstance := namenode.NewService(uint64(blockSize), uint64(replicationFactor), uint16(serverPort)) err := discoverDataNodes(nameNodeInstance, &listOfDataNodes) util.Check(err) @@ -71,7 +76,14 @@ func InitializeNameNodeUtil(serverPort int, blockSize int, replicationFactor int log.Printf("List of DataNode(s) in service is %q\n", listOfDataNodes) log.Printf("NameNode port is %d\n", serverPort) - go heartbeatToDataNodes(listOfDataNodes, nameNodeInstance) + if isPrimary { + log.Printf("Starting processes for primary NameNode\n") + go heartbeatToDataNodes(listOfDataNodes, nameNodeInstance) + go scheduledFlushToDisk(nameNodeInstance) + } else { + log.Printf("Starting processes for secondary NameNode\n") + go getServiceStateFromPrimary(nameNodeInstance, listOfDataNodes) + } err = rpc.Register(nameNodeInstance) util.Check(err) @@ -114,3 +126,53 @@ func heartbeatToDataNodes(listOfDataNodes []string, nameNode *namenode.Service) } } } + +func scheduledFlushToDisk(namenode *namenode.Service) { + for range time.Tick(time.Second * 10) { + flushServiceStateToDisk(namenode) + } +} + +func flushServiceStateToDisk(namenode *namenode.Service) { + state, err := SerializeNameNodeImage(namenode) + util.Check(err) + + fileWriteHandler, err := os.Create(NameNodeState) + util.Check(err) + + fileWriter := bufio.NewWriter(fileWriteHandler) + _, err = fileWriter.WriteString(state) + util.Check(err) + fileWriter.Flush() + fileWriteHandler.Close() + log.Println("NameNodeService state flushed to disk") +} + +func getServiceStateFromPrimary(namenodeInstance *namenode.Service, listOfDataNodes []string) { + primaryNameNodeHost := "localhost" + const primaryNameNodePort = "9000" + + for range time.Tick(time.Second * 2) { + primaryNameNodeClient, connectionErr := rpc.Dial("tcp", primaryNameNodeHost+":"+primaryNameNodePort) + + if connectionErr != nil { + log.Printf("Unable to connect to NameNode on %s, starting recovery\n", primaryNameNodePort) + dataBytes, err := ioutil.ReadFile(NameNodeState) + util.Check(err) + + namenodeInstance, err = DeserializeNameNodeImage(string(dataBytes)) + util.Check(err) + go heartbeatToDataNodes(listOfDataNodes, namenodeInstance) + go flushServiceStateToDisk(namenodeInstance) + return + } + + var response *namenode.Service + stateFetchErr := primaryNameNodeClient.Call("Service.GetState", true, &response) + if stateFetchErr != nil { + log.Println("Error fetching state from primary NameNode") + } + + flushServiceStateToDisk(response) + } +} diff --git a/daemon/namenode/serde.go b/daemon/namenode/serde.go new file mode 100644 index 0000000..aa05c46 --- /dev/null +++ b/daemon/namenode/serde.go @@ -0,0 +1,41 @@ +package namenode + +import ( + "bytes" + "encoding/base64" + "encoding/gob" + + "github.com/rounakdatta/GoDFS/namenode" +) + +func SerializeNameNodeImage(image *namenode.Service) (string, error) { + buf := bytes.Buffer{} + e := gob.NewEncoder(&buf) + err := e.Encode(image) + + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(buf.Bytes()), nil +} + +func DeserializeNameNodeImage(serializedImage string) (*namenode.Service, error) { + image := namenode.Service{} + + data, err := base64.StdEncoding.DecodeString(serializedImage) + if err != nil { + return nil, err + } + + buf := bytes.Buffer{} + buf.Write(data) + d := gob.NewDecoder(&buf) + err = d.Decode(&image) + + if err != nil { + return nil, err + } + + return &image, nil +} diff --git a/main.go b/main.go index ed9e8fb..16712af 100644 --- a/main.go +++ b/main.go @@ -2,12 +2,13 @@ package main import ( "flag" - "github.com/rounakdatta/GoDFS/daemon/client" - "github.com/rounakdatta/GoDFS/daemon/datanode" - "github.com/rounakdatta/GoDFS/daemon/namenode" "log" "os" "strings" + + "github.com/rounakdatta/GoDFS/daemon/client" + "github.com/rounakdatta/GoDFS/daemon/datanode" + "github.com/rounakdatta/GoDFS/daemon/namenode" ) func main() { @@ -22,6 +23,7 @@ func main() { nameNodeListPtr := nameNodeCommand.String("datanodes", "", "Comma-separated list of DataNodes to connect to") nameNodeBlockSizePtr := nameNodeCommand.Int("block-size", 32, "Block size to store") nameNodeReplicationFactorPtr := nameNodeCommand.Int("replication-factor", 1, "Replication factor of the system") + nameNodeIsPrimary := nameNodeCommand.Bool("is-primary", true, "If this NameNode is primary") clientNameNodePortPtr := clientCommand.String("namenode", "localhost:9000", "NameNode communication port") clientOperationPtr := clientCommand.String("operation", "", "Operation to perform") @@ -46,7 +48,7 @@ func main() { } else { listOfDataNodes = []string{} } - namenode.InitializeNameNodeUtil(*nameNodePortPtr, *nameNodeBlockSizePtr, *nameNodeReplicationFactorPtr, listOfDataNodes) + namenode.InitializeNameNodeUtil(*nameNodePortPtr, *nameNodeBlockSizePtr, *nameNodeReplicationFactorPtr, *nameNodeIsPrimary, listOfDataNodes) case "client": _ = clientCommand.Parse(os.Args[2:]) diff --git a/namenode/namenode.go b/namenode/namenode.go index ff4467f..bbfc731 100644 --- a/namenode/namenode.go +++ b/namenode/namenode.go @@ -1,14 +1,15 @@ package namenode import ( - "github.com/google/uuid" - "github.com/rounakdatta/GoDFS/datanode" - "github.com/rounakdatta/GoDFS/util" "log" "math" "math/rand" "net/rpc" "strings" + + "github.com/google/uuid" + "github.com/rounakdatta/GoDFS/datanode" + "github.com/rounakdatta/GoDFS/util" ) type NameNodeMetaData struct { @@ -67,6 +68,13 @@ func selectRandomNumbers(availableItems []uint64, count uint64) (randomNumberSet return } +func (nameNode *Service) GetState(request bool, reply *Service) error { + if request { + reply = nameNode + } + return nil +} + func (nameNode *Service) GetBlockSize(request bool, reply *uint64) error { if request { *reply = nameNode.BlockSize