11/**
2- * Copyright (C) 2011 Cloud.com, Inc. All rights reserved.
2+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
3+ *
4+ *
35 */
46
57package com .cloud .usage ;
68
79import java .net .InetAddress ;
10+ import java .sql .SQLException ;
811import java .util .Calendar ;
912import java .util .Date ;
1013import java .util .HashMap ;
3639import com .cloud .usage .dao .UsageStorageDao ;
3740import com .cloud .usage .dao .UsageVMInstanceDao ;
3841import com .cloud .usage .dao .UsageVolumeDao ;
42+ import com .cloud .usage .parser .IPAddressUsageParser ;
3943import com .cloud .usage .parser .LoadBalancerUsageParser ;
4044import com .cloud .usage .parser .NetworkOfferingUsageParser ;
4145import com .cloud .usage .parser .NetworkUsageParser ;
@@ -90,16 +94,19 @@ public class UsageManagerImpl implements UsageManager, Runnable {
9094 private String m_version = null ;
9195 private String m_name = null ;
9296 private final Calendar m_jobExecTime = Calendar .getInstance ();
93- private int m_aggregationDuration = 0 ;
97+ private int m_aggregationDuration = 0 ;
98+ private int m_sanityCheckInterval = 0 ;
9499 String m_hostname = null ;
95100 int m_pid = 0 ;
96101 TimeZone m_usageTimezone = TimeZone .getTimeZone ("GMT" );;
97102 private final GlobalLock m_heartbeatLock = GlobalLock .getInternLock ("usage.job.heartbeat.check" );
98103
99104 private final ScheduledExecutorService m_executor = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("Usage-Job" ));
100- private final ScheduledExecutorService m_heartbeatExecutor = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("Usage-HB" ));
105+ private final ScheduledExecutorService m_heartbeatExecutor = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("Usage-HB" ));
106+ private final ScheduledExecutorService m_sanityExecutor = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("Usage-Sanity" ));
101107 private Future m_scheduledFuture = null ;
102- private Future m_heartbeat = null ;
108+ private Future m_heartbeat = null ;
109+ private Future m_sanity = null ;
103110
104111 protected UsageManagerImpl () {
105112 }
@@ -144,8 +151,12 @@ public boolean configure(String name, Map<String, Object> params) throws Configu
144151
145152 String execTime = configs .get ("usage.stats.job.exec.time" );
146153 String aggregationRange = configs .get ("usage.stats.job.aggregation.range" );
147- String execTimeZone = configs .get ("usage.execution.timezone" );
154+ String execTimeZone = configs .get ("usage.execution.timezone" );
148155 String aggreagationTimeZone = configs .get ("usage.aggregation.timezone" );
156+ String sanityCheckInterval = configs .get ("usage.sanity.check.interval" );
157+ if (sanityCheckInterval != null ){
158+ m_sanityCheckInterval = Integer .parseInt (sanityCheckInterval );
159+ }
149160
150161 m_usageTimezone = TimeZone .getTimeZone (aggreagationTimeZone );
151162 s_logger .debug ("Usage stats aggregation time zone: " +aggreagationTimeZone );
@@ -210,7 +221,11 @@ public boolean start() {
210221 // use the configured exec time and aggregation duration for scheduling the job
211222 m_scheduledFuture = m_executor .scheduleAtFixedRate (this , m_jobExecTime .getTimeInMillis () - System .currentTimeMillis (), m_aggregationDuration * 60 * 1000 , TimeUnit .MILLISECONDS );
212223
213- m_heartbeat = m_heartbeatExecutor .scheduleAtFixedRate (new Heartbeat (), /* start in 15 seconds...*/ 15 *1000 , /* check database every minute*/ 60 *1000 , TimeUnit .MILLISECONDS );
224+ m_heartbeat = m_heartbeatExecutor .scheduleAtFixedRate (new Heartbeat (), /* start in 15 seconds...*/ 15 *1000 , /* check database every minute*/ 60 *1000 , TimeUnit .MILLISECONDS );
225+
226+ if (m_sanityCheckInterval > 0 ){
227+ m_sanity = m_sanityExecutor .scheduleAtFixedRate (new SanityCheck (), 1 , m_sanityCheckInterval , TimeUnit .DAYS );
228+ }
214229
215230 Transaction usageTxn = Transaction .open (Transaction .USAGE_DB );
216231 try {
@@ -236,7 +251,8 @@ public boolean start() {
236251
237252 public boolean stop () {
238253 m_heartbeat .cancel (true );
239- m_scheduledFuture .cancel (true );
254+ m_scheduledFuture .cancel (true );
255+ m_sanity .cancel (true );
240256 return true ;
241257 }
242258
@@ -710,6 +726,13 @@ private boolean parseHelperTables(AccountVO account, Date currentStartDate, Date
710726 if (!parsed ) {
711727 s_logger .debug ("network offering usage successfully parsed? " + parsed + " (for account: " + account .getAccountName () + ", id: " + account .getId () + ")" );
712728 }
729+ }
730+
731+ parsed = IPAddressUsageParser .parse (account , currentStartDate , currentEndDate );
732+ if (s_logger .isDebugEnabled ()) {
733+ if (!parsed ) {
734+ s_logger .debug ("IPAddress usage successfully parsed? " + parsed + " (for account: " + account .getAccountName () + ", id: " + account .getId () + ")" );
735+ }
713736 }
714737
715738 return parsed ;
@@ -818,7 +841,16 @@ private void createVMHelperEvent(UsageEventVO event) {
818841 usageInstance .setEndDate (event .getCreateDate ());
819842 m_usageInstanceDao .update (usageInstance );
820843 }
821- }
844+ }
845+ }
846+
847+ sc = m_usageInstanceDao .createSearchCriteria ();
848+ sc .addAnd ("vmInstanceId" , SearchCriteria .Op .EQ , Long .valueOf (vmId ));
849+ sc .addAnd ("endDate" , SearchCriteria .Op .NULL );
850+ sc .addAnd ("usageType" , SearchCriteria .Op .EQ , UsageTypes .ALLOCATED_VM );
851+ usageInstances = m_usageInstanceDao .search (sc , null );
852+ if (usageInstances == null || (usageInstances .size () == 0 )) {
853+ s_logger .error ("Cannot find allocated vm entry for a vm running with id: " + vmId );
822854 }
823855
824856 Long templateId = event .getTemplateId ();
@@ -989,15 +1021,30 @@ private void createVolumeHelperEvent(UsageEventVO event) {
9891021 size = event .getSize ();
9901022 }
9911023
992- if (EventTypes .EVENT_VOLUME_CREATE .equals (event .getType ())) {
1024+ if (EventTypes .EVENT_VOLUME_CREATE .equals (event .getType ())) {
1025+ SearchCriteria <UsageVolumeVO > sc = m_usageVolumeDao .createSearchCriteria ();
1026+ sc .addAnd ("accountId" , SearchCriteria .Op .EQ , event .getAccountId ());
1027+ sc .addAnd ("id" , SearchCriteria .Op .EQ , volId );
1028+ sc .addAnd ("deleted" , SearchCriteria .Op .NULL );
1029+ List <UsageVolumeVO > volumesVOs = m_usageVolumeDao .search (sc , null );
1030+ if (volumesVOs .size () > 0 ) {
1031+ //This is a safeguard to avoid double counting of volumes.
1032+ s_logger .error ("Found duplicate usage entry for volume: " + volId + " assigned to account: " + event .getAccountId () + "; marking as deleted..." );
1033+ }
1034+ for (UsageVolumeVO volumesVO : volumesVOs ) {
1035+ if (s_logger .isDebugEnabled ()) {
1036+ s_logger .debug ("deleting volume: " + volumesVO .getId () + " from account: " + volumesVO .getAccountId ());
1037+ }
1038+ volumesVO .setDeleted (event .getCreateDate ());
1039+ m_usageVolumeDao .update (volumesVO );
1040+ }
9931041 if (s_logger .isDebugEnabled ()) {
9941042 s_logger .debug ("create volume with id : " + volId + " for account: " + event .getAccountId ());
9951043 }
9961044 Account acct = m_accountDao .findByIdIncludingRemoved (event .getAccountId ());
9971045 UsageVolumeVO volumeVO = new UsageVolumeVO (volId , zoneId , event .getAccountId (), acct .getDomainId (), doId , templateId , size , event .getCreateDate (), null );
9981046 m_usageVolumeDao .persist (volumeVO );
9991047 } else if (EventTypes .EVENT_VOLUME_DELETE .equals (event .getType ())) {
1000- // at this point it's not a sourceNat IP, so find the usage record with this IP and a null released date, update the released date
10011048 SearchCriteria <UsageVolumeVO > sc = m_usageVolumeDao .createSearchCriteria ();
10021049 sc .addAnd ("accountId" , SearchCriteria .Op .EQ , event .getAccountId ());
10031050 sc .addAnd ("id" , SearchCriteria .Op .EQ , volId );
@@ -1344,5 +1391,21 @@ protected void deleteOneOffJobs(String hostname, int pid) {
13441391 sc .addAnd ("scheduled" , SearchCriteria .Op .EQ , Integer .valueOf (0 ));
13451392 m_usageJobDao .expunge (sc );
13461393 }
1394+ }
1395+
1396+ private class SanityCheck implements Runnable {
1397+ public void run () {
1398+ UsageSanityChecker usc = new UsageSanityChecker ();
1399+ try {
1400+ String errors = usc .runSanityCheck ();
1401+ if (errors .length () > 0 ){
1402+ _alertMgr .sendAlert (AlertManager .ALERT_TYPE_USAGE_SANITY_RESULT , 0 , new Long (0 ), "Usage Sanity Check failed" , errors );
1403+ } else {
1404+ _alertMgr .clearAlert (AlertManager .ALERT_TYPE_USAGE_SANITY_RESULT , 0 , 0 );
1405+ }
1406+ } catch (SQLException e ) {
1407+ s_logger .error ("Error in sanity check" , e );
1408+ }
1409+ }
13471410 }
13481411}
0 commit comments